Data API for Redshiftでデータのロード/アンロードを試してみた
先日Data API for Redshiftが利用可能になりました。 遅ればせながら実際に自分自身で触ってみたかったので、データのロード/アンロード操作を試してみました。 Data API for Redshiftの詳細や使い方等については、下記のアップデートブログがあるので、そちらをご参照ください。
環境
ローカルからAWS CLIを実行しました。バージョンは下記の通りです。 ※今回は事前にAWS CLIの(2020/10/04時点での)最新版が実行環境にインストール済みである前提で進めます。
- Mac OS : 10.15.7
- AWS CLI : 1.18.152
- Redshiftクラスター : 1.0.19097
テーブル作成
任意のRedshiftクラスタを作成し、テーブル作成をします。せっかくなのでテーブルもData APIを実行して作成しました。
テーブル情報、サンプルデータはこちらの一部を拝借しました。
$ aws --profile nagamasa redshift-data execute-statement \ --secret-arn arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX \ --cluster-identifier nagamasa-cls-01 \ --database dev \ --sql "create table part ( p_partkey INTEGER NOT NULL, p_name VARCHAR(22) NOT NULL, p_mfgr VARCHAR(6) NOT NULL, p_category VARCHAR(7) NOT NULL, p_brand1 VARCHAR(9) NOT NULL, p_color VARCHAR(11) NOT NULL, p_type VARCHAR(25) NOT NULL, p_size INTEGER NOT NULL, p_container VARCHAR(10) NOT NULL);" { "ClusterIdentifier": "nagamasa-cls-01", "CreatedAt": 1601765440.339, "Database": "dev", "Id": "11860fc2-bc83-4fe6-a48e-d8dd28cabaf0", "SecretArn": "arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX" } $
ロード
早速データをロードしていきます。 execute-statement
APIで--sql
パラメータにCOPYコマンドを指定して実行します。
$ aws --profile nagamasa redshift-data execute-statement \ --secret-arn arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX \ --cluster-identifier nagamasa-cls-01 \ --database dev \ --sql "copy part from 's3://awssampledbuswest2/ssbgz/part' credentials 'aws_iam_role=arn:aws:iam::XXXXXXXXXXXX:role/myRedshiftRole' gzip region 'us-west-2';" { "ClusterIdentifier": "nagamasa-cls-01", "CreatedAt": 1601765979.623, "Database": "dev", "Id": "7e84ad4b-f20a-47f1-812b-efc4798e5b1d", "SecretArn": "arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX" } $
返却されたレスポンスのIdの値(SQLステートメントのID)を describe-statement
APIの--id
パラメータに指定して実行すると、SQLステートメントの詳細が取得できました。StatusがFINISHED
となっていたので非同期でのCOPYコマンドの完了が確認できました。
実際に実装する際は、StatusがFINISHED
になっていることが確認できたら後続処理に進むなどの実装になりそうですね!
$ aws --profile nagamasa redshift-data describe-statement --id 7e84ad4b-f20a-47f1-812b-efc4798e5b1d { "ClusterIdentifier": "nagamasa-cls-01", "CreatedAt": 1601765979.623, "Duration": 10725667637, "Id": "7e84ad4b-f20a-47f1-812b-efc4798e5b1d", "QueryString": "copy part from 's3://awssampledbuswest2/ssbgz/part'\ncredentials ''\ngzip region 'us-west-2';", "RedshiftPid": 25465, "RedshiftQueryId": -1, "ResultRows": 0, "ResultSize": 0, "SecretArn": "arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX", "Status": "FINISHED", "UpdatedAt": 1601765991.011 } $
ちゃんとデータもロードされていました〜。
dev=# select count(*) from part; count --------- 1400000 (1 row) dev=#
アンロード
次にデータをS3にアンロードしていきます。COPYコマンド同様に、execute-statement
APIで--sql
パラメータにUNLOADコマンドを指定して実行します。
$ aws --profile nagamasa redshift-data execute-statement \ --secret-arn arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX \ --cluster-identifier nagamasa-cls-01 \ --database dev \ --sql "unload ('select p_partkey, p_name, p_type, p_container from part order by p_partkey') to 's3://nagamasa-test/output/part' credentials 'aws_iam_role=arn:aws:iam::XXXXXXXXXXXX:role/myRedshiftRole' header parallel off csv;" { "ClusterIdentifier": "nagamasa-cls-01", "CreatedAt": 1601766883.444, "Database": "dev", "Id": "51524ab2-3ea8-47e4-9b28-0fbda3be8466", "SecretArn": "arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX" } $
アンロードも問題なくでき、データも確認できました〜
$ aws --profile nagamasa s3 cp s3://nagamasa-test/output/part000 - 2>/dev/null | head p_partkey,p_name,p_type,p_container 1,lace spring,PROMO BURNISHED COPPER,JUMBO PKG 2,rosy metallic,LARGE BRUSHED BRASS,LG CASE 3,green antique,STANDARD POLISHED BRASS,WRAP CASE 4,metallic smoke,SMALL PLATED BRASS,MED DRUM 5,blush chiffon,STANDARD POLISHED TIN,SM PKG 6,ivory azure,PROMO PLATED STEEL,MED BAG 7,blanched tan,SMALL PLATED COPPER,SM BAG 8,khaki cream,PROMO BURNISHED TIN,LG DRUM 9,rose moccasin,SMALL BURNISHED STEEL,WRAP CASE $
ちなみに①
Data API for Redshiftでは非同期でSQLクエリが実行されるということで、Redshiftクラスタの状態が[変更中]でも実行できたりするのかな?と思い試してみましたが、下記のようにData APIでエラーレスポンスが返却されました。(まあ、そうですよね)
An error occurred (ValidationException) when calling the ExecuteStatement operation: Redshift cluster status 'MODIFY_CLUSTER_IAM_ROLES' is not allowed
ちなみに②
Lambdaでの実行も試してみましたが、Lambda内部で呼ばれているSDKが更新されてないのか現時点(2020/10/04)でもまだ正常実行が確認できませんでした。 ※下記の条件で試しましたが、いずれも同じ実行結果(エラー)でした。
- Python3.7, Python3.6, Python2.7
- バージニア北部(us-east-1)リージョン
- IAM ロールに AmazonRedshiftDataFullAccess ポリシーをアタッチ
import json import boto3 def lambda_handler(event, context): rs_client = boto3.client('redshift-data') res = rs_client.execute_statement( ClusterIdentifier = 'nagamasa-cls-01', secretArn = 'arn:aws:secretsmanager:us-east-1:XXXXXXXXXXXX:secret:XXXXXXXXXXXXXXXXXX', Database = 'dev', Sql = 'select count(*) from part;' ) print (res)
{ "errorMessage": "Unknown service: 'redshift-data'. Valid service names are: accessanalyzer, acm, acm-pca, alexaforbusiness, amplify, apigateway, apigatewaymanagementapi, apigatewayv2, appconfig, application-autoscaling, application-insights, appmesh, appstream, appsync, athena, autoscaling, autoscaling-plans, backup, batch, braket, budgets, ce, chime, cloud9, clouddirectory, cloudformation, cloudfront, cloudhsm, cloudhsmv2, cloudsearch, cloudsearchdomain, cloudtrail, cloudwatch, codeartifact, codebuild, codecommit, codedeploy, codeguru-reviewer, codeguruprofiler, codepipeline, codestar, codestar-connections, codestar-notifications, cognito-identity, cognito-idp, cognito-sync, comprehend, comprehendmedical, compute-optimizer, config, connect, connectparticipant, cur, dataexchange, datapipeline, datasync, dax, detective, devicefarm, directconnect, discovery, dlm, dms, docdb, ds, dynamodb, dynamodbstreams, ebs, ec2, ec2-instance-connect, ecr, ecs, efs, eks, elastic-inference, elasticache, elasticbeanstalk, elastictranscoder, elb, elbv2, emr, es, events, firehose, fms, forecast, forecastquery, frauddetector, fsx, gamelift, glacier, globalaccelerator, glue, greengrass, groundstation, guardduty, health, honeycode, iam, identitystore, imagebuilder, importexport, inspector, iot, iot-data, iot-jobs-data, iot1click-devices, iot1click-projects, iotanalytics, iotevents, iotevents-data, iotsecuretunneling, iotsitewise, iotthingsgraph, ivs, kafka, kendra, kinesis, kinesis-video-archived-media, kinesis-video-media, kinesis-video-signaling, kinesisanalytics, kinesisanalyticsv2, kinesisvideo, kms, lakeformation, lambda, lex-models, lex-runtime, license-manager, lightsail, logs, machinelearning, macie, macie2, managedblockchain, marketplace-catalog, marketplace-entitlement, marketplacecommerceanalytics, mediaconnect, mediaconvert, medialive, mediapackage, mediapackage-vod, mediastore, mediastore-data, mediatailor, meteringmarketplace, mgh, migrationhub-config, mobile, mq, mturk, neptune, networkmanager, opsworks, opsworkscm, organizations, outposts, personalize, personalize-events, personalize-runtime, pi, pinpoint, pinpoint-email, pinpoint-sms-voice, polly, pricing, qldb, qldb-session, quicksight, ram, rds, rds-data, redshift, rekognition, resource-groups, resourcegroupstaggingapi, robomaker, route53, route53domains, route53resolver, s3, s3control, sagemaker, sagemaker-a2i-runtime, sagemaker-runtime, savingsplans, schemas, sdb, secretsmanager, securityhub, serverlessrepo, service-quotas, servicecatalog, servicediscovery, ses, sesv2, shield, signer, sms, sms-voice, snowball, sns, sqs, ssm, sso, sso-oidc, stepfunctions, storagegateway, sts, support, swf, synthetics, textract, transcribe, transfer, translate, waf, waf-regional, wafv2, workdocs, worklink, workmail, workmailmessageflow, workspaces, xray", "errorType": "UnknownServiceError", (後略) }
おわりに
アップデートから少し時間も経っていたので、本当はLambdaでの実行確認をメインにした記事内容にしたくて試してみたのですが、まだLambdaでの正常実行は確認できませんでした。 Data API for Redshiftに触れることが目的の簡単な記事でしたが、自分自身で触ってみて使いやすさを実感できたことは良かったです。 Lambdaでも実行できるようになったら、どんどん活用できそうですね! 以上、データアナリティクス事業本部のナガマサでした〜